package com.xzx.spark.tuning.reduce

import com.xzx.spark.tuning.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 *
 * ${DESCRIPTION}
 *
 * @author xinzhixuan
 * @version 1.0
 * @date 2022-03-24 10:31 PM
 */
object DynamicPartitionSmallFileTuning {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("DynamicPartitionSmallFileTuning")
      .set("spark.sql.shuffle.partitions", "8")
      .setMaster("local[*]") //TODO 要打包提交集群执行，注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    // 意味着dt 和 dn 分区下数据量比较大的单独插入
    sparkSession.sql(
      """
        |CREATE TABLE if not exists `sparktuning`.`dynamic_csc` (
        |  `courseid` BIGINT,
        |  `coursename` STRING,
        |  `createtime` STRING,
        |  `discount` STRING,
        |  `orderid` STRING,
        |  `sellmoney` STRING,
        |  `dt` STRING,
        |  `dn` STRING)
        |USING parquet
        |PARTITIONED BY (dt, dn)
         """.stripMargin)

    // TODO 非倾斜分区写入
    sparkSession.sql(
      """
        |insert overwrite sparktuning.dynamic_csc partition(dt,dn)
        |select * from sparktuning.course_shopping_cart
        |where dt!='20190722' and dn!='webA'
        |distribute by dt,dn
      """.stripMargin)

    // TODO 倾斜分区打散写入
    sparkSession.sql(
      """
        |insert overwrite sparktuning.dynamic_csc partition(dt,dn)
        |select * from sparktuning.course_shopping_cart
        |where dt='20190722' and dn='webA'
        |distribute by cast(rand() * 5 as int)
      """.stripMargin)


    InitUtil.hangOn()
  }

}
